/**
 * @project: parallel-task
 * @package: com.ngplat.paralleltask.task
 * @filename: WorkUnit.java
 *
 * Copyright (c) 2018 eSunny Info. Tech Ltd. All rights reserved.
 * 
 */
package com.ngplat.paralleltask.task;

import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskTimeoutException;

import com.ngplat.paralleltask.common.Cleanable;
import com.ngplat.paralleltask.constants.TaskState;
import com.ngplat.paralleltask.exception.TaskExecutionException;
import com.ngplat.paralleltask.task.config.ThreadConfig;

/**
 * @typename: WorkUnit
 * @brief: 一组工作单元(层级概念)
 * @author: KI ZCQ
 * @date: 2018年5月15日 下午3:00:25
 * @version: 1.0.0
 * @since
 * 
 */
public class WorkUnit implements Cleanable {

	private static final Logger logger = LoggerFactory.getLogger(WorkUnit.class);

	private CompletionService<Void> completion;
	// 方便终止任务
	private Map<Future<Void>, WorkerTask> futureMap = new ConcurrentHashMap<Future<Void>, WorkerTask>();
	// 提交任务数
	private int totalTaskCount;
	// 提供终止中断点
	private AtomicBoolean continueFlag = new AtomicBoolean(true);
	// 已完成任务数
	private AtomicInteger completedTaskCount = new AtomicInteger(0);

	/**
	 * 创建一个新的实例 WorkUnit.
	 */
	public WorkUnit(int totalTaskCount) {
		this(totalTaskCount, Executors.newFixedThreadPool(1));
	}

	/**
	 * 创建一个新的实例 WorkUnit.
	 * 
	 * @param pool
	 */
	public WorkUnit(int totalTaskCount, Executor pool) {
		this.totalTaskCount = totalTaskCount;
		continueFlag = new AtomicBoolean(true);
		completedTaskCount = new AtomicInteger(0);
		completion = new ExecutorCompletionService<Void>(pool);
	}

	/**
	 * @Description: 提交一个可执行任务, 该任务的处理器必须实现Runnable
	 * @param task
	 */
	public void submit(WorkerTask task) {
		// 任务处理器
		TaskProcessor processor = task.getProcessor();
		// 不允许为空
		if (processor == null) {
			throw new NullPointerException("任务处理器TaskProcessor为空, " + task.toString());
		}

		// 判断任务是否可以执行
		if (task.isExecuted()) {
			// 提交任务
			futureMap.put(completion.submit(processor, null), task);
		} else {
			throw new TaskExecutionException("提交了一个不可执行的任务, " + task.toString());
		}
	}

	/**
	 * @Description: 等待任务完成
	 */
	public void waitForCompletion() {
		this.waitForCompletion(TaskPoolManager.DEFAULT.getConfig().getTaskTimeoutMillSeconds());
	}

	/**
	 * @Description: 等待任务完成
	 * @param timeoutMillSeconds
	 *            超时时间
	 */
	public void waitForCompletion(long timeoutMillSeconds) {

		while (completedTaskCount.get() < totalTaskCount && continueFlag.get()) {

			if (futureMap == null) {
				throw new TaskExecutionException("可执行任务为空, 请检查.");
			}

			try {
				// 此时有执行的任务, 则等待完成, 否则死循环
				if (futureMap.size() > 0) {
					Future<Void> future;
					try {

						if (timeoutMillSeconds == ThreadConfig.NOT_LIMIT) {
							future = completion.poll();
						} else {
							future = completion.poll(timeoutMillSeconds, TimeUnit.MILLISECONDS);
						}

					} catch (InterruptedException e) {
						logger.error("wait for execute completion failed,e=", e);
						throw new RuntimeException(e);
					}

					if (future == null) {
						String errMsg = "Wait for execute completion timeout: " + this.futureInfo();
						processException(errMsg);
						throw new TaskTimeoutException(errMsg);
					} else {
						// 任务组织单元
						WorkerTask task = futureMap.get(future);
						// 上下文环境是否有异常信息
						TaskContext ctx = task.getProcessor().getContext();
						// 异常信息
						Exception exp = null;
						
						logger.info("任务执行完成, 任务信息: {}", task.toString());
						
						// 有异常信息
						if((exp = ctx.getException(task.getInfo().getTaskId())) != null) {
							// work finished
							workFinished(task, TaskState.TASK_FAILED);
							String errMsg = "Task execute completion error, Please Check it: " + this.futureInfo();
							processException(errMsg);
							throw new TaskExecutionException(exp);
						} else {
							// work finished
							workFinished(task, TaskState.TASK_COMPLETE);
						}
						
						// TODO 移除?
						futureMap.remove(future);
					}
				}
			} catch (Exception e) {
				logger.error("wait for execute completion failed,e=", e);
				break;
			}
			
		}
		// 此处将Java引用置为null，JVM GC时能尽快回收，非操作系统层面的资源释放。
		// 未调用或未调用成功对系统及资源释放无影响，当前线程执行完后，JVM仍可回收。
		this.cleanResource();
	}
	
	/**
	 * @Description: 处理异常场景 
	 * @param errMsg
	 */
	private void processException(String errMsg) {
		this.cancelAllTask();
		logger.error(errMsg);
		this.cleanResource();
	}

	/**
	 * @Description: 对于已完成的任务, 无需执行, 仅发通知
	 * @param task
	 */
	public void workFinished(WorkerTask task, TaskState state) {
		// 任务有效性判断
		if (task == null) {
			throw new NullPointerException("任务基本信息为空, 请检查.");
		}

		// 计数器加1
		completedTaskCount.incrementAndGet();
		// 打印日志
		logger.info("已完成:{}, 总:{}", completedTaskCount.get(), totalTaskCount);
		// 通知子任务, 已完成
		TaskPoolManager.DEFAULT.fireEvent(task.getInfo(), state);
	}

	/**
	 * @Description: 取消所有任务
	 */
	private void cancelAllTask() {
		for (Future<Void> future : futureMap.keySet()) {
			boolean isCancel = future.cancel(true);
			logger.info("Cancel task success: " + isCancel + ": " + futureMap.get(future));
		}
	}

	/**
	 * @Description: 任务信息，供异常时打印堆栈信息
	 * @return 全部任务信息
	 */
	private String futureInfo() {
		StringBuilder sb = new StringBuilder();
		for (WorkerTask task : futureMap.values()) {
			sb.append(task.toString());
		}
		return sb.toString();
	}

	/**
	 * @Description: 释放资源，供JVM回收
	 * @see com.ngplat.paralleltask.common.Cleanable#cleanResource()
	 */
	@Override
	public void cleanResource() {
		this.futureMap = null;
		this.completion = null;
		continueFlag = new AtomicBoolean(true);
		completedTaskCount = new AtomicInteger(0);
		
		// 任务重置
        TaskPoolManager.DEFAULT.cleanResource();
	}

}
